package com.huan.flink.source.file;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.File;

/**
 * flink 从文件中读取
 *
 * @author huan.fu
 * @date 2023/9/17 - 22:53
 */
public class FlinkFileSourceApplication {

    public static void main(String[] args) throws Exception {
        //  1、创建执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2、从文件中读取数据
        String filePath = "/Users/huan/code/IdeaProjects/me/spring-cloud-parent/flink/flink-operator-source/src/main/resources/word.txt";
        // 构建FileSource
        FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(new File(filePath))).build();
        // 名字，随便写，有意义即可
        String sourceName = "fileSource";
        // 新的Source写法
        environment.fromSource(fileSource, WatermarkStrategy.noWatermarks(), sourceName)
                // 3、将读取到一行数据进行切割、转换
                .flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (line, collector) -> {
                    // 每一行以空格进行分隔
                    String[] words = line.split(" ");
                    for (String word : words) {
                        // 使用 collector 向下游发送数据
                        collector.collect(Tuple2.of(word, 1));
                    }
                })
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                // 4、根据 word 进行分组 f0表示的是 Tuple2对象中的第一个字段的值
                .keyBy(tuple2 -> tuple2.f0)
                // 5、各分组内进行聚合 1表示的是 Tuple2对象中的第二个字段的值
                .sum(1)
                // 6、输出
                .print();

        environment.execute("read-from-file");
    }
}
